#include <ctype.h>
#include <signal.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <syslog.h>
#include <time.h>
#include <sys/time.h>
#include <getopt.h>

#include </usr/local/include/librdkafka/rdkafka.h>


static int run = 1;
static rd_kafka_t *rk;
static int exit_eof = 0;
static int quiet = 0;
static enum {
    OUTPUT_HEXDUMP,
    OUTPUT_RAW,
} output = OUTPUT_HEXDUMP;

static void stop(int sig) {
    run = 0;
    fclose(stdin); /* abort fgets() */
}


static void hexdump(FILE *fp, const char *name, const void *ptr, size_t len) {
    const char *p = (const char *) ptr;
    size_t of = 0;


    if (name)
        fprintf(fp, "%s hexdump (%zd bytes):\n", name, len);

    for (of = 0; of < len; of += 16) {
        char hexen[16 * 3 + 1];
        char charen[16 + 1];
        int hof = 0;

        int cof = 0;
        int i;

        for (i = of; i < (int) of + 16 && i < (int) len; i++) {
            hof += sprintf(hexen + hof, "%02x ", p[i] & 0xff);
            cof += sprintf(charen + cof, "%c",
                           isprint((int) p[i]) ? p[i] : '.');
        }
        fprintf(fp, "%08zx: %-48s %-16s\n",
                of, hexen, charen);
    }
}

/**
 * Kafka logger callback (optional)
 */
static void logger(const rd_kafka_t *rk, int level,
                   const char *fac, const char *buf) {
    struct timeval tv;
    gettimeofday(&tv, NULL);
    fprintf(stderr, "%u.%03u RDKAFKA-%i-%s: %s: %s\n",
            (int) tv.tv_sec, (int) (tv.tv_usec / 1000),
            level, fac, rk ? rd_kafka_name(rk) : NULL, buf);
}


/**
 * Message delivery report callback using the richer rd_kafka_message_t object.
 */
static void msg_delivered(rd_kafka_t *rk,
                          const rd_kafka_message_t *rkmessage, void *opaque) {
    if (rkmessage->err)
        fprintf(stderr, "%% Message delivery failed: %s\n",
                rd_kafka_err2str(rkmessage->err));
    else if (!quiet)
        fprintf(stderr,
                "%% Message delivered (%zd bytes, offset %"PRId64", "
                "partition %"PRId32"): %.*s\n",
                rkmessage->len, rkmessage->offset,
                rkmessage->partition,
                (int) rkmessage->len, (const char *) rkmessage->payload);
}


static void msg_consume(rd_kafka_message_t *rkmessage,
                        void *opaque) {
    if (rkmessage->err) {
        if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
            fprintf(stderr,
                    "%% Consumer reached end of %s [%"PRId32"] "
                    "message queue at offset %"PRId64"\n",
                    rd_kafka_topic_name(rkmessage->rkt),
                    rkmessage->partition, rkmessage->offset);

            if (exit_eof)
                run = 0;

            return;
        }

        fprintf(stderr, "%% Consume error for topic \"%s\" [%"PRId32"] "
                        "offset %"PRId64": %s\n",
                rd_kafka_topic_name(rkmessage->rkt),
                rkmessage->partition,
                rkmessage->offset,
                rd_kafka_message_errstr(rkmessage));

        if (rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION ||
            rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
            run = 0;
        return;
    }

    if (!quiet) {
        rd_kafka_timestamp_type_t tstype;
        int64_t timestamp;
        rd_kafka_headers_t *hdrs;

        fprintf(stdout, "%% Message (offset %"PRId64", %zd bytes):\n",
                rkmessage->offset, rkmessage->len);

        timestamp = rd_kafka_message_timestamp(rkmessage, &tstype);
        if (tstype != RD_KAFKA_TIMESTAMP_NOT_AVAILABLE) {
            const char *tsname = "?";
            if (tstype == RD_KAFKA_TIMESTAMP_CREATE_TIME)
                tsname = "create time";
            else if (tstype == RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME)
                tsname = "log append time";

            fprintf(stdout, "%% Message timestamp: %s %"PRId64
                            " (%ds ago)\n",
                    tsname, timestamp,
                    !timestamp ? 0 :
                    (int) time(NULL) - (int) (timestamp / 1000));
        }

        if (!rd_kafka_message_headers(rkmessage, &hdrs)) {
            size_t idx = 0;
            const char *name;
            const void *val;
            size_t size;

            fprintf(stdout, "%% Headers:");

            while (!rd_kafka_header_get_all(hdrs, idx++,
                                            &name, &val, &size)) {
                fprintf(stdout, "%s%s=",
                        idx == 1 ? " " : ", ", name);
                if (val)
                    fprintf(stdout, "\"%.*s\"",
                            (int) size, (const char *) val);
                else
                    fprintf(stdout, "NULL");
            }
            fprintf(stdout, "\n");
        }
    }

    if (rkmessage->key_len) {
        if (output == OUTPUT_HEXDUMP)
            hexdump(stdout, "Message Key",
                    rkmessage->key, rkmessage->key_len);
        else
            printf("Key: %.*s\n",
                   (int) rkmessage->key_len, (char *) rkmessage->key);
    }

    if (output == OUTPUT_HEXDUMP)
        hexdump(stdout, "Message Payload",
                rkmessage->payload, rkmessage->len);
    else
        printf("%.*s\n",
               (int) rkmessage->len, (char *) rkmessage->payload);
}


static void metadata_print(const char *topic,
                           const struct rd_kafka_metadata *metadata) {
    int i, j, k;
    int32_t controllerid;

    printf("Metadata for %s (from broker %"PRId32": %s):\n",
           topic ?: "all topics",
           metadata->orig_broker_id,
           metadata->orig_broker_name);

    controllerid = rd_kafka_controllerid(rk, 0);


    /* Iterate brokers */
    printf(" %i brokers:\n", metadata->broker_cnt);
    for (i = 0; i < metadata->broker_cnt; i++)
        printf("  broker %"PRId32" at %s:%i%s\n",
               metadata->brokers[i].id,
               metadata->brokers[i].host,
               metadata->brokers[i].port,
               controllerid == metadata->brokers[i].id ?
               " (controller)" : "");

    /* Iterate topics */
    printf(" %i topics:\n", metadata->topic_cnt);
    for (i = 0; i < metadata->topic_cnt; i++) {
        const struct rd_kafka_metadata_topic *t = &metadata->topics[i];
        printf("  topic \"%s\" with %i partitions:",
               t->topic,
               t->partition_cnt);
        if (t->err) {
            printf(" %s", rd_kafka_err2str(t->err));
            if (t->err == RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE)
                printf(" (try again)");
        }
        printf("\n");

        /* Iterate topic's partitions */
        for (j = 0; j < t->partition_cnt; j++) {
            const struct rd_kafka_metadata_partition *p;
            p = &t->partitions[j];
            printf("    partition %"PRId32", "
                   "leader %"PRId32", replicas: ",
                   p->id, p->leader);

            /* Iterate partition's replicas */
            for (k = 0; k < p->replica_cnt; k++)
                printf("%s%"PRId32,
                       k > 0 ? "," : "", p->replicas[k]);

            /* Iterate partition's ISRs */
            printf(", isrs: ");
            for (k = 0; k < p->isr_cnt; k++)
                printf("%s%"PRId32,
                       k > 0 ? "," : "", p->isrs[k]);
            if (p->err)
                printf(", %s\n", rd_kafka_err2str(p->err));
            else
                printf("\n");
        }
    }
}


static void sig_usr1(int sig) {
    rd_kafka_dump(stdout, rk);
}

int main(int argc, char **argv) {
    rd_kafka_topic_t *rkt;
    char *brokers = "localhost:9092";
    char mode = 'C';
    char *topic = NULL;
    int partition = RD_KAFKA_PARTITION_UA;
    int opt;
    rd_kafka_conf_t *conf;
    rd_kafka_topic_conf_t *topic_conf;
    char errstr[512];
    int64_t start_offset = 0;
    int do_conf_dump = 0;
    char tmp[16];
    int64_t seek_offset = 0;
    int64_t tmp_offset = 0;
    int get_wmarks = 0;
    rd_kafka_headers_t *hdrs = NULL;
    rd_kafka_resp_err_t err;

    /* Kafka configuration */
    conf = rd_kafka_conf_new();

    /* Set logger */
    rd_kafka_conf_set_log_cb(conf, logger);

    /* Quick termination */
    snprintf(tmp, sizeof(tmp), "%i", SIGIO);
    rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);

    /* Topic configuration */
    topic_conf = rd_kafka_topic_conf_new();

    while ((opt = getopt(argc, argv, "PCLt:p:b:z:qd:o:eX:As:H:")) != -1) {
        switch (opt) {
            case 'P':
            case 'C':
            case 'L':
                mode = opt;
                break;
            case 't':
                topic = optarg;
                break;
            case 'p':
                partition = atoi(optarg);
                break;
            case 'b':
                brokers = optarg;
                break;
            case 'z':
                if (rd_kafka_conf_set(conf, "compression.codec",
                                      optarg,
                                      errstr, sizeof(errstr)) !=
                    RD_KAFKA_CONF_OK) {
                    fprintf(stderr, "%% %s\n", errstr);
                    exit(1);
                }
                break;
            case 'o':
            case 's':
                if (!strcmp(optarg, "end"))
                    tmp_offset = RD_KAFKA_OFFSET_END;
                else if (!strcmp(optarg, "beginning"))
                    tmp_offset = RD_KAFKA_OFFSET_BEGINNING;
                else if (!strcmp(optarg, "stored"))
                    tmp_offset = RD_KAFKA_OFFSET_STORED;
                else if (!strcmp(optarg, "wmark"))
                    get_wmarks = 1;
                else {
                    tmp_offset = strtoll(optarg, NULL, 10);

                    if (tmp_offset < 0)
                        tmp_offset = RD_KAFKA_OFFSET_TAIL(-tmp_offset);
                }

                if (opt == 'o')
                    start_offset = tmp_offset;
                else if (opt == 's')
                    seek_offset = tmp_offset;
                break;
            case 'e':
                exit_eof = 1;
                break;
            case 'd':
                if (rd_kafka_conf_set(conf, "debug", optarg,
                                      errstr, sizeof(errstr)) !=
                    RD_KAFKA_CONF_OK) {
                    fprintf(stderr,
                            "%% Debug configuration failed: "
                            "%s: %s\n",
                            errstr, optarg);
                    exit(1);
                }
                break;
            case 'q':
                quiet = 1;
                break;
            case 'A':
                output = OUTPUT_RAW;
                break;
            case 'H': {
                char *name, *val;
                size_t name_sz = -1;

                name = optarg;
                val = strchr(name, '=');
                if (val) {
                    name_sz = (size_t) (val - name);
                    val++; /* past the '=' */
                }

                if (!hdrs)
                    hdrs = rd_kafka_headers_new(8);

                err = rd_kafka_header_add(hdrs, name, name_sz, val, -1);
                if (err) {
                    fprintf(stderr,
                            "%% Failed to add header %s: %s\n",
                            name, rd_kafka_err2str(err));
                    exit(1);
                }
            }
                break;

            case 'X': {
                char *name, *val;
                rd_kafka_conf_res_t res;

                if (!strcmp(optarg, "list") ||
                    !strcmp(optarg, "help")) {
                    rd_kafka_conf_properties_show(stdout);
                    exit(0);
                }

                if (!strcmp(optarg, "dump")) {
                    do_conf_dump = 1;
                    continue;
                }

                name = optarg;
                if (!(val = strchr(name, '='))) {
                    char dest[512];
                    size_t dest_size = sizeof(dest);
                    /* Return current value for property. */

                    res = RD_KAFKA_CONF_UNKNOWN;
                    if (!strncmp(name, "topic.", strlen("topic.")))
                        res = rd_kafka_topic_conf_get(
                                topic_conf,
                                name + strlen("topic."),
                                dest, &dest_size);
                    if (res == RD_KAFKA_CONF_UNKNOWN)
                        res = rd_kafka_conf_get(
                                conf, name, dest, &dest_size);

                    if (res == RD_KAFKA_CONF_OK) {
                        printf("%s = %s\n", name, dest);
                        exit(0);
                    } else {
                        fprintf(stderr,
                                "%% %s property\n",
                                res == RD_KAFKA_CONF_UNKNOWN ?
                                "Unknown" : "Invalid");
                        exit(1);
                    }
                }

                *val = '\0';
                val++;

                res = RD_KAFKA_CONF_UNKNOWN;
                /* Try "topic." prefixed properties on topic
                 * conf first, and then fall through to global if
                 * it didnt match a topic configuration property. */
                if (!strncmp(name, "topic.", strlen("topic.")))
                    res = rd_kafka_topic_conf_set(topic_conf,
                                                  name +
                                                  strlen("topic."),
                                                  val,
                                                  errstr,
                                                  sizeof(errstr));

                if (res == RD_KAFKA_CONF_UNKNOWN)
                    res = rd_kafka_conf_set(conf, name, val,
                                            errstr, sizeof(errstr));

                if (res != RD_KAFKA_CONF_OK) {
                    fprintf(stderr, "%% %s\n", errstr);
                    exit(1);
                }
            }
                break;

            default:
                goto usage;
        }
    }


    if (do_conf_dump) {
        const char **arr;
        size_t cnt;
        int pass;

        for (pass = 0; pass < 2; pass++) {
            int i;

            if (pass == 0) {
                arr = rd_kafka_conf_dump(conf, &cnt);
                printf("# Global config\n");
            } else {
                printf("# Topic config\n");
                arr = rd_kafka_topic_conf_dump(topic_conf,
                                               &cnt);
            }

            for (i = 0; i < (int) cnt; i += 2)
                printf("%s = %s\n",
                       arr[i], arr[i + 1]);

            printf("\n");

            rd_kafka_conf_dump_free(arr, cnt);
        }

        exit(0);
    }


    if (optind != argc || (mode != 'L' && !topic)) {
        usage:
        fprintf(stderr,
                "Usage: %s -C|-P|-L -t <topic> "
                "[-p <partition>] [-b <host1:port1,host2:port2,..>]\n"
                "\n"
                "librdkafka version %s (0x%08x)\n"
                "\n"
                " Options:\n"
                "  -C | -P         Consumer or Producer mode\n"
                "  -L              Metadata list mode\n"
                "  -t <topic>      Topic to fetch / produce\n"
                "  -p <num>        Partition (random partitioner)\n"
                "  -b <brokers>    Broker address (localhost:9092)\n"
                "  -z <codec>      Enable compression:\n"
                "                  none|gzip|snappy\n"
                "  -o <offset>     Start offset (consumer):\n"
                "                  beginning, end, NNNNN or -NNNNN\n"
                "                  wmark returns the current hi&lo "
                "watermarks.\n"
                "  -e              Exit consumer when last message\n"
                "                  in partition has been received.\n"
                "  -d [facs..]     Enable debugging contexts:\n"
                "                  %s\n"
                "  -q              Be quiet\n"
                "  -A              Raw payload output (consumer)\n"
                "  -H <name[=value]> Add header to message (producer)\n"
                "  -X <prop=name>  Set arbitrary librdkafka "
                "configuration property\n"
                "                  Properties prefixed with \"topic.\" "
                "will be set on topic object.\n"
                "  -X list         Show full list of supported "
                "properties.\n"
                "  -X dump         Show configuration\n"
                "  -X <prop>       Get single property value\n"
                "\n"
                " In Consumer mode:\n"
                "  writes fetched messages to stdout\n"
                " In Producer mode:\n"
                "  reads messages from stdin and sends to broker\n"
                " In List mode:\n"
                "  queries broker for metadata information, "
                "topic is optional.\n"
                "\n"
                "\n"
                "\n",
                argv[0],
                rd_kafka_version_str(), rd_kafka_version(),
                RD_KAFKA_DEBUG_CONTEXTS);
        exit(1);
    }

    if ((mode == 'C' && !isatty(STDIN_FILENO)) ||
        (mode != 'C' && !isatty(STDOUT_FILENO)))
        quiet = 1;


    signal(SIGINT, stop);
    signal(SIGUSR1, sig_usr1);

    if (mode == 'P') {
        /*
         * Producer
         */
        char buf[2048];
        int sendcnt = 0;

        /* Set up a message delivery report callback.
         * It will be called once for each message, either on successful
         * delivery to broker, or upon failure to deliver to broker. */
        rd_kafka_conf_set_dr_msg_cb(conf, msg_delivered);

        /* Create Kafka handle */
        if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
                                errstr, sizeof(errstr)))) {
            fprintf(stderr,
                    "%% Failed to create new producer: %s\n",
                    errstr);
            exit(1);
        }

        /* Add brokers */
        if (rd_kafka_brokers_add(rk, brokers) == 0) {
            fprintf(stderr, "%% No valid brokers specified\n");
            exit(1);
        }

        /* Create topic */
        rkt = rd_kafka_topic_new(rk, topic, topic_conf);
        topic_conf = NULL; /* Now owned by topic */

        if (!quiet)
            fprintf(stderr,
                    "%% Type stuff and hit enter to send\n");

        while (run && fgets(buf, sizeof(buf), stdin)) {
            size_t len = strlen(buf);
            if (buf[len - 1] == '\n')
                buf[--len] = '\0';

            err = RD_KAFKA_RESP_ERR_NO_ERROR;

            /* Send/Produce message. */
            if (hdrs) {
                rd_kafka_headers_t *hdrs_copy;

                hdrs_copy = rd_kafka_headers_copy(hdrs);

                err = rd_kafka_producev(
                        rk,
                        RD_KAFKA_V_RKT(rkt),
                        RD_KAFKA_V_PARTITION(partition),
                        RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
                        RD_KAFKA_V_VALUE(buf, len),
                        RD_KAFKA_V_HEADERS(hdrs_copy),
                        RD_KAFKA_V_END);

                if (err)
                    rd_kafka_headers_destroy(hdrs_copy);

            } else {
                if (rd_kafka_produce(
                        rkt, partition,
                        RD_KAFKA_MSG_F_COPY,
                        /* Payload and length */
                        buf, len,
                        /* Optional key and its length */
                        NULL, 0,
                        /* Message opaque, provided in
                         * delivery report callback as
                         * msg_opaque. */
                        NULL) == -1) {
                    err = rd_kafka_last_error();
                }
            }

            if (err) {
                fprintf(stderr,
                        "%% Failed to produce to topic %s "
                        "partition %i: %s\n",
                        rd_kafka_topic_name(rkt), partition,
                        rd_kafka_err2str(err));

                /* Poll to handle delivery reports */
                rd_kafka_poll(rk, 0);
                continue;
            }

            if (!quiet)
                fprintf(stderr, "%% Sent %zd bytes to topic "
                                "%s partition %i\n",
                        len, rd_kafka_topic_name(rkt), partition);
            sendcnt++;
            /* Poll to handle delivery reports */
            rd_kafka_poll(rk, 0);
        }

        /* Poll to handle delivery reports */
        rd_kafka_poll(rk, 0);

        /* Wait for messages to be delivered */
        while (run && rd_kafka_outq_len(rk) > 0)
            rd_kafka_poll(rk, 100);

        /* Destroy topic */
        rd_kafka_topic_destroy(rkt);

        /* Destroy the handle */
        rd_kafka_destroy(rk);

    } else if (mode == 'C') {
        /*
         * Consumer
         */

        rd_kafka_conf_set(conf, "enable.partition.eof", "true",
                          NULL, 0);

        /* Create Kafka handle */
        if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf,
                                errstr, sizeof(errstr)))) {
            fprintf(stderr,
                    "%% Failed to create new consumer: %s\n",
                    errstr);
            exit(1);
        }

        /* Add brokers */
        if (rd_kafka_brokers_add(rk, brokers) == 0) {
            fprintf(stderr, "%% No valid brokers specified\n");
            exit(1);
        }

        if (get_wmarks) {
            int64_t lo, hi;
            rd_kafka_resp_err_t err;

            /* Only query for hi&lo partition watermarks */

            if ((err = rd_kafka_query_watermark_offsets(
                    rk, topic, partition, &lo, &hi, 5000))) {
                fprintf(stderr, "%% query_watermark_offsets() "
                                "failed: %s\n",
                        rd_kafka_err2str(err));
                exit(1);
            }

            printf("%s [%d]: low - high offsets: "
                   "%"PRId64" - %"PRId64"\n",
                   topic, partition, lo, hi);

            rd_kafka_destroy(rk);
            exit(0);
        }


        /* Create topic */
        rkt = rd_kafka_topic_new(rk, topic, topic_conf);
        topic_conf = NULL; /* Now owned by topic */

        /* Start consuming */
        if (rd_kafka_consume_start(rkt, partition, start_offset) == -1) {
            rd_kafka_resp_err_t err = rd_kafka_last_error();
            fprintf(stderr, "%% Failed to start consuming: %s\n",
                    rd_kafka_err2str(err));
            if (err == RD_KAFKA_RESP_ERR__INVALID_ARG)
                fprintf(stderr,
                        "%% Broker based offset storage "
                        "requires a group.id, "
                        "add: -X group.id=yourGroup\n");
            exit(1);
        }

        while (run) {
            rd_kafka_message_t *rkmessage;
            rd_kafka_resp_err_t err;

            /* Poll for errors, etc. */
            rd_kafka_poll(rk, 0);

            /* Consume single message.
             * See rdkafka_performance.c for high speed
             * consuming of messages. */
            rkmessage = rd_kafka_consume(rkt, partition, 1000);
            if (!rkmessage) /* timeout */
                continue;

            msg_consume(rkmessage, NULL);

            /* Return message to rdkafka */
            rd_kafka_message_destroy(rkmessage);

            if (seek_offset) {
                err = rd_kafka_seek(rkt, partition, seek_offset,
                                    2000);
                if (err)
                    printf("Seek failed: %s\n",
                           rd_kafka_err2str(err));
                else
                    printf("Seeked to %"PRId64"\n",
                           seek_offset);
                seek_offset = 0;
            }
        }

        /* Stop consuming */
        rd_kafka_consume_stop(rkt, partition);

        while (rd_kafka_outq_len(rk) > 0)
            rd_kafka_poll(rk, 10);

        /* Destroy topic */
        rd_kafka_topic_destroy(rkt);

        /* Destroy handle */
        rd_kafka_destroy(rk);

    } else if (mode == 'L') {
        rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;

        /* Create Kafka handle */
        if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
                                errstr, sizeof(errstr)))) {
            fprintf(stderr,
                    "%% Failed to create new producer: %s\n",
                    errstr);
            exit(1);
        }

        /* Add brokers */
        if (rd_kafka_brokers_add(rk, brokers) == 0) {
            fprintf(stderr, "%% No valid brokers specified\n");
            exit(1);
        }

        /* Create topic */
        if (topic) {
            rkt = rd_kafka_topic_new(rk, topic, topic_conf);
            topic_conf = NULL; /* Now owned by topic */
        } else
            rkt = NULL;

        while (run) {
            const struct rd_kafka_metadata *metadata;

            /* Fetch metadata */
            err = rd_kafka_metadata(rk, rkt ? 0 : 1, rkt,
                                    &metadata, 5000);
            if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
                fprintf(stderr,
                        "%% Failed to acquire metadata: %s\n",
                        rd_kafka_err2str(err));
                run = 0;
                break;
            }

            metadata_print(topic, metadata);

            rd_kafka_metadata_destroy(metadata);
            run = 0;
        }

        /* Destroy topic */
        if (rkt)
            rd_kafka_topic_destroy(rkt);

        /* Destroy the handle */
        rd_kafka_destroy(rk);

        if (topic_conf)
            rd_kafka_topic_conf_destroy(topic_conf);


        /* Exit right away, dont wait for background cleanup, we haven't
         * done anything important anyway. */
        exit(err ? 2 : 0);
    }

    if (hdrs)
        rd_kafka_headers_destroy(hdrs);

    if (topic_conf)
        rd_kafka_topic_conf_destroy(topic_conf);

    /* Let background threads clean up and terminate cleanly. */
    run = 5;
    while (run-- > 0 && rd_kafka_wait_destroyed(1000) == -1)
        printf("Waiting for librdkafka to decommission\n");
    if (run <= 0)
        rd_kafka_dump(stdout, rk);

    return 0;
}
